Generation Flow of Fragment Mechanism

Steps:

  • load text fragment mechanism (text based: mech and smiles)

  • create fragments and fragment reactions (from smiles, check isomorphic duplicate, add reaction_repr for fragment reaction)

  • get thermo and kinetics

Input:

  • text fragment mechanism and smiles dict

Output:

  • chemkin file for fragment mechanism

IMPORTANT: USE RMG-Py frag_kinetics_gen_new branch, RMG-dabase frag_kinetics_gen_new branch, rmg_env


In [ ]:
import os
from tqdm import tqdm

In [ ]:
from rmgpy import settings
from rmgpy.data.rmg import RMGDatabase
from rmgpy.kinetics import KineticsData
from rmgpy.rmg.model import getFamilyLibraryObject
from rmgpy.data.kinetics.family import TemplateReaction
from rmgpy.data.kinetics.depository import DepositoryReaction
from rmgpy.data.kinetics.common import find_degenerate_reactions
from rmgpy.chemkin import saveChemkinFile, saveSpeciesDictionary

In [ ]:
import afm
import afm.fragment
import afm.reaction

0. helper methods


In [ ]:
def read_frag_mech(frag_mech_path):

	reaction_string_dict = {}
	current_family = ''
	with open(frag_mech_path) as f_in:
		for line in f_in:
			if line.startswith('#') and ':' in line:
				_, current_family = [token.strip() for token in line.split(':')]
			elif line.strip() and not line.startswith('#'):
				reaction_string = line.strip()
				if current_family not in reaction_string_dict:
					reaction_string_dict[current_family] = [reaction_string]
				else:
					reaction_string_dict[current_family].append(reaction_string)

	return reaction_string_dict

def parse_reaction_string(reaction_string):

	reactant_side, product_side = [token.strip() for token in reaction_string.split('==')]
	reactant_strings = [token.strip() for token in reactant_side.split('+')]
	product_strings = [token.strip() for token in product_side.split('+')]

	return reactant_strings, product_strings

1. load text-format fragment mech


In [ ]:
job_name = 'two-sided_newcut1'
afm_base = os.path.dirname(afm.__path__[0])
working_dir = os.path.join(afm_base, 'examples', 'pdd_chemistry', job_name)

In [ ]:
# load RMG database to create reactions
database = RMGDatabase()

In [ ]:
database.load(
    path = settings['database.directory'], 
    thermoLibraries = ['primaryThermoLibrary'], # can add others if necessary
    kineticsFamilies = 'all', 
    reactionLibraries = [], 
    kineticsDepositories = ''
)
thermodb = database.thermo
# Add training reactions
for family in database.kinetics.families.values():
    family.addKineticsRulesFromTrainingSet(thermoDatabase=thermodb)
# average up all the kinetics rules
for family in database.kinetics.families.values():
    family.fillKineticsRulesByAveragingUp()

In [ ]:
# load fragment from smiles-like string
fragment_smiles_filepath = os.path.join(working_dir, 'fragment_smiles.txt')

fragments = []
with open(fragment_smiles_filepath) as f_in:
    for line in f_in:
        if line.strip() and not line.startswith('#') and ':' in line:
            label, smiles = [token.strip() for token in line.split(":")]
            frag = afm.fragment.Fragment(label=label).from_SMILES_like_string(smiles)
            frag.assign_representative_species()
            frag.species_repr.label = label
            for prev_frag in fragments:
                if frag.isIsomorphic(prev_frag):
                    raise Exception('Isomorphic duplicate found: {0} and {1}'.format(label, prev_frag.label))
            fragments.append(frag)

# construct label-key fragment dictionary
fragment_dict = {}
for frag0 in fragments:
    if frag0.label not in fragment_dict:
        fragment_dict[frag0.label] = frag0
    else:
        raise Exception('Fragment with duplicated labels found: {0}'.format(frag0.label))

In [ ]:
# put aromatic isomer in front of species.molecule
# 'cause that's the isomer we want to react
for frag in fragments:
    species = frag.species_repr
    species.generateResonanceIsomers()
    for mol in species.molecule:
        if mol.isAromatic():
            species.molecule = [mol]
            break

In [ ]:
# load fragment mech in text
fragment_mech_filepath = os.path.join(working_dir, 'frag_mech.txt')

reaction_string_dict = read_frag_mech(fragment_mech_filepath)

# generate reactions
fragment_rxns = []

for family_label in reaction_string_dict:
    # parse reaction strings
    print "Processing {0}...".format(family_label)
    for reaction_string in tqdm(reaction_string_dict[family_label]):
        reactant_strings, product_strings = parse_reaction_string(reaction_string)

        reactants = [fragment_dict[reactant_string].species_repr for reactant_string in reactant_strings]
        products = [fragment_dict[product_string].species_repr.molecule[0] for product_string in product_strings]
        
        for idx, reactant in enumerate(reactants):
            for mol in reactant.molecule:
                mol.props['label'] = reactant_strings[idx]
        
        for idx, product in enumerate(products):
            product.props['label'] = product_strings[idx]
        # this script requires reactants to be a list of Species objects
        # products to be a list of Molecule objects.
        # returned rxns have reactants and products in Species type
        new_rxns = database.kinetics.generate_reactions_from_families(reactants=reactants, 
                                                                      products=products, 
                                                                      only_families=[family_label],
                                                                     resonance=True)

        if len(new_rxns) != 1:
            print reaction_string + family_label

            raise Exception('Non-unique reaction is generated with {0}'.format(reaction_string))
        
        # create fragment reactions
        rxn = new_rxns[0]
        
        fragrxn = afm.reaction.FragmentReaction(index=-1,
                                    reversible=True,
                                    family=rxn.family,
                                    reaction_repr=rxn)

        fragment_rxns.append(fragrxn)

2. get thermo and kinetics


In [ ]:
from rmgpy.data.rmg import getDB
from rmgpy.thermo.thermoengine import processThermoData
from rmgpy.thermo import NASA
import rmgpy.constants as constants
import math

In [ ]:
thermodb = getDB('thermo')
# calculate thermo for each species
for fragrxn in tqdm(fragment_rxns):
    rxn0 = fragrxn.reaction_repr
    for spe in rxn0.reactants + rxn0.products:
        thermo0 = thermodb.getThermoData(spe)
        if spe.label in ['RCCCCR', 'LCCCCR', 'LCCCCL']:
            thermo0.S298.value_si += constants.R * math.log(2)
        spe.thermo = processThermoData(spe, thermo0, NASA)
    
    family = getFamilyLibraryObject(rxn0.family)
    # Get the kinetics for the reaction
    kinetics, source, entry, isForward = family.getKinetics(rxn0, \
                                    templateLabels=rxn0.template, degeneracy=rxn0.degeneracy, \
                                    estimator='rate rules', returnAllKinetics=False)
    rxn0.kinetics = kinetics

    if not isForward:
        rxn0.reactants, rxn0.products = rxn0.products, rxn0.reactants
        rxn0.pairs = [(p,r) for r,p in rxn0.pairs]
    
    # convert KineticsData to Arrhenius forms
    if isinstance(rxn0.kinetics, KineticsData):
        rxn0.kinetics = rxn0.kinetics.toArrhenius()
    #  correct barrier heights of estimated kinetics
    if isinstance(rxn0,TemplateReaction) or isinstance(rxn0,DepositoryReaction): # i.e. not LibraryReaction
        rxn0.fixBarrierHeight() # also converts ArrheniusEP to Arrhenius.
    
    fragrxts = [fragment_dict[rxt.label] for rxt in rxn0.reactants]
    fragprds = [fragment_dict[prd.label] for prd in rxn0.products]
    fragpairs = [(fragment_dict[p0.label],fragment_dict[p1.label]) for p0,p1 in rxn0.pairs]
    
    fragrxn.reactants=fragrxts
    fragrxn.products=fragprds
    fragrxn.pairs=fragpairs
    fragrxn.kinetics=rxn0.kinetics

2.1 correct entropy for certain fragments


In [ ]:
for frag in fragments:
    spe = frag.species_repr
    thermo0 = thermodb.getThermoData(spe)
    if spe.label in ['RCCCCR', 'LCCCCR', 'LCCCCL']:
        thermo0.S298.value_si += constants.R * math.log(2)
    
    spe.thermo = processThermoData(spe, thermo0, NASA)
    if spe.label in ['RCCCCR', 'LCCCCR', 'LCCCCL']:
        print spe.label
        print spe.getFreeEnergy(670)/4184

2.2 correct kinetics for reactions with certain fragments


In [ ]:
for fragrxn in tqdm(fragment_rxns):
    rxn0 = fragrxn.reaction_repr
    if rxn0.family in ['R_Recombination', 'H_Abstraction', 'R_Addition_MultipleBond']:
        for spe in rxn0.reactants + rxn0.products:
            if spe.label in ['RCC*CCR', 'LCC*CCR', 'LCC*CCL']:
                rxn0.kinetics.changeRate(4)
        
        fragrxn.kinetics=rxn0.kinetics

3. save in chemkin format


In [ ]:
species_list = []
for frag in fragments:
    species = frag.species_repr
    species_list.append(species)
len(fragments)

In [ ]:
reaction_list = []
for fragrxn in fragment_rxns:
    rxn = fragrxn.reaction_repr
    reaction_list.append(rxn)
len(reaction_list)

In [ ]:
# dump chemkin files
chemkin_path = os.path.join(working_dir, 'chem_annotated.inp')
dictionaryPath = os.path.join(working_dir, 'species_dictionary.txt')
saveChemkinFile(chemkin_path, species_list, reaction_list)
saveSpeciesDictionary(dictionaryPath, species_list)

4. correct atom count in chemkin


In [ ]:
def update_atom_count(tokens, parts, R_count):

	# remove R_count*2 C and R_count*5 H
	string = ''
	if R_count == 0:
		return 'G'.join(parts)
	else:
		H_count = int(tokens[2].split('C')[0])
		H_count_update = H_count - 5*R_count

		C_count = int(tokens[3])
		C_count_update = C_count - 2*R_count

		tokens = tokens[:2] + [str(H_count_update)+'C'] + [C_count_update]

		# Line 1
		string += '{0:<16}        '.format(tokens[0])

		string += '{0!s:<2}{1:>3d}'.format('H', H_count_update)
		string += '{0!s:<2}{1:>3d}'.format('C', C_count_update)
		string += '     ' * (4 - 2)
		string += 'G' + parts[1]
		return string

In [ ]:
corrected_chemkin_path = os.path.join(working_dir, 'chem_annotated.inp')

In [ ]:
output_string = ''
with open(chemkin_path) as f_in:
    readThermo = False
    for line in f_in:
        if line.startswith('THERM ALL'):
            readThermo = True

        if not readThermo: 
            output_string += line
            continue

        if line.startswith('!'): 
            output_string += line
            continue

        if 'G' in line and '1' in line:
            parts = [part for part in line.split('G')]
            tokens = [token.strip() for token in parts[0].split()]

            species_label = tokens[0]
            R_count = species_label.count('R')
            L_count = species_label.count('L')

            updated_line = update_atom_count(tokens, parts, R_count+L_count)

            output_string += updated_line

        else:
            output_string += line

with open(corrected_chemkin_path, 'w') as f_out:
	f_out.write(output_string)

5. add pseudo reactions


In [ ]: